Flask 源码分析

参考文章: Flask源码分析(一), Flask源码分析(二), Flask源码分析(三)

WSGI

在说到wsgi时我们先看一下面向http的python程序需要关心的内容:

  1. 请求:
    • 请求方法(method)
    • 请求地址(url)
    • 请求内容(body)
    • 请求头(header)
    • 请求环境(environ)
  2. 响应:
    • 响应码(status_code)
    • 响应数据(data)
    • 响应头(header)

wsgi要做的就是关于程序端和服务端的标准规范, 将服务程序接收到的请i去传递给python程序, 并将网络的数据流和python的结构体进行转换.
它规定了python程序必须是一个可调用对象(实现了call函数的方法或类), 接受两个参数environ(WSGI的环境信息)和start_response(开始响应请求的函数), 并返回可迭代的结果. 直接上代码来实现一个最简单的web程序返回hello world:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from werkzeug.serving import run_simple

class WebClass:
def __init__(self):
pass

def __call__(self, environ, start_response):
status = '200 OK'
response_headers = [('Content-type', 'text/plain')]
start_response(status, response_headers)
yield str.encode("Hello World!\n")

if __name__ == "__main__":
app = WebClass()
run_simple("127.0.0.1", 5000, app)

WebClass正是实现了__call__方法的可调用对象, 接受environ和start_respone, 并在返回之前调用start_response, start_response接受两个必须的参数, status_code(http状态码)和response_header(响应头), yield hello world正是要求的可迭代结果, 现在这个类只是实现了最简单的功能, 路由注册, 模板渲染等都没有实现.这里用了werkzeug提供的run_simple, 其实我们创建flask应用, 跑起来的时候调用的也是这个函数,后面将会讲到。

项目运行

app.run(), 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def run(self, host=None, port=None, debug=None, load_dotenv=True, **options):
# 是否是 cli 启动
if os.environ.get("FLASK_RUN_FROM_CLI") == "true":
from .debughelpers import explain_ignored_app_run

explain_ignored_app_run()
return

if get_load_dotenv(load_dotenv):
cli.load_dotenv()

# if set, let env vars override previous values
if "FLASK_ENV" in os.environ:
self.env = get_env()
self.debug = get_debug_flag()
elif "FLASK_DEBUG" in os.environ:
self.debug = get_debug_flag()

# debug passed to method overrides all other sources
if debug is not None:
self.debug = bool(debug)

_host = "127.0.0.1"
_port = 5000
server_name = self.config.get("SERVER_NAME")
sn_host, sn_port = None, None

if server_name:
sn_host, _, sn_port = server_name.partition(":")

host = host or sn_host or _host
# pick the first value that's not None (0 is allowed)
port = int(next((p for p in (port, sn_port) if p is not None), _port))

options.setdefault("use_reloader", self.debug)
options.setdefault("use_debugger", self.debug)
# 默认是多线程启动的
options.setdefault("threaded", True)
# 显示 banner 信息
cli.show_server_banner(self.env, self.debug, self.name, False)

from werkzeug.serving import run_simple

try:
# 运行 run_simple
run_simple(host, port, self, **options)
finally:
self._got_first_request = False

上面函数的功能很简单,处理了以下参数,其中最主要的还是调用 werkzeug 的 run_simple 函数, run_simple 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def run_simple(
hostname,
port,
application,
use_reloader=False,
use_debugger=False,
use_evalex=True,
extra_files=None,
reloader_interval=1,
reloader_type="auto",
threaded=False,
processes=1,
request_handler=None,
static_files=None,
passthrough_errors=False,
ssl_context=None,
):

if not isinstance(port, int):
raise TypeError("port must be an integer")
if use_debugger:
from .debug import DebuggedApplication

application = DebuggedApplication(application, use_evalex)
if static_files:
from .middleware.shared_data import SharedDataMiddleware

application = SharedDataMiddleware(application, static_files)
# 启动后的显示信息,包含运行地址,端口等
def log_startup(sock):
display_hostname = hostname if hostname not in ("", "*") else "localhost"
quit_msg = "(Press CTRL+C to quit)"
if sock.family == af_unix:
_log("info", " * Running on %s %s", display_hostname, quit_msg)
else:
if ":" in display_hostname:
display_hostname = "[%s]" % display_hostname
port = sock.getsockname()[1]
_log(
"info",
" * Running on %s://%s:%d/ %s",
"http" if ssl_context is None else "https",
display_hostname,
port,
quit_msg,
)

def inner():
try:
fd = int(os.environ["WERKZEUG_SERVER_FD"])
except (LookupError, ValueError):
fd = None
# 获取一个 wsgi 对象
srv = make_server(
hostname,
port,
application,
threaded,
processes,
request_handler,
passthrough_errors,
ssl_context,
fd=fd,
)
if fd is None:
log_startup(srv.socket)
# 调用这个 wsgi 对象的 serve_forever 方法,让其项目处于运行状态
srv.serve_forever()

if use_reloader:
if not is_running_from_reloader():
if port == 0 and not can_open_by_fd:
raise ValueError(
"Cannot bind to a random port with enabled "
"reloader if the Python interpreter does "
"not support socket opening by fd."
)
address_family = select_address_family(hostname, port)
server_address = get_sockaddr(hostname, port, address_family)
s = socket.socket(address_family, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(server_address)
if hasattr(s, "set_inheritable"):
s.set_inheritable(True)
if can_open_by_fd:
os.environ["WERKZEUG_SERVER_FD"] = str(s.fileno())
s.listen(LISTEN_QUEUE)
log_startup(s)
else:
s.close()
if address_family == af_unix:
_log("info", "Unlinking %s" % server_address)
os.unlink(server_address)
from ._reloader import run_with_reloader

run_with_reloader(inner, extra_files, reloader_interval, reloader_type)
else:
# 调用 inner 函数
inner()

run_simple 中最后调用了其中的 inner 函数,inner 中的 make_server 函数返回一个 WSGIServer 对象,然后再调用 WSGIServer 对象的 serve_forever 方法,创建 wsgi 的服务, 然后运行,这样项目就运行起来了。

请求处理

前面说了wsgi规定应用程序必须实现__call__方法, 找到Flask对应的内容:

1
2
def __call__(self, environ, start_response):
return self.wsgi_app(environ, start_response)

wsgi_app 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def wsgi_app(self, environ, start_response):
# 获取 RequestContext 对象
ctx = self.request_context(environ)
error = None
try:
try:
ctx.push()
# full_dispatch_request 中做了很多事情,包含请求钩子处理,请求处理等
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except:
error = sys.exc_info()[1]
raise
return response(environ, start_response)
finally:
if self.should_ignore_error(error):
error = None
ctx.auto_pop(error)

full_dispatch_request 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
def full_dispatch_request(self):
self.try_trigger_before_first_request_functions()
try:
request_started.send(self)
# 判断是否有 @before_first_request,@before_request 装饰的钩子函数,如果有调用
rv = self.preprocess_request()
if rv is None:
rv = self.dispatch_request()
except Exception as e:
rv = self.handle_user_exception(e)
# 将请求结果通过钩子函数处理(如果有 @after_request 装饰的函数,那么就处理,不管请求是否异常都要执行的 teardown_request 函数)
return self.finalize_request(rv)

这段最核心的就是 dispatch_request , dispatch_request 就是我们注册的路由函数的执行结果, 在 dispatch_request 之前我们看到 preprocess_request , 它的作用是将钩子函数处理一下

  1. 第一次请求处理之前的钩子函数, 通过 before_first_request 定义的
  2. 每个请求处理之前的钩子函数, 通过 before_request 定义的

dispatch_request 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def dispatch_request(self):
req = _request_ctx_stack.top.request

# 如果有 routing_exception(即 URL 路由异常),则抛出该异常
if req.routing_exception is not None:
self.raise_routing_exception(req)
rule = req.url_rule

# 如果 URL 规则要求自动提供 OPTIONS 方法,并且当前请求是 OPTIONS 请求
if (getattr(rule, "provide_automatic_options", False) and req.method == "OPTIONS"):
return self.make_default_options_response()

# 根据 URL 规则的 endpoint 获取相应的视图函数,并传递 req.view_args 参数
return self.view_functions[rule.endpoint](**req.view_args)

而在 dispat_request 之后还有 finalize_request 函数, 它的作用同样是将请求结果通过钩子函数处理一下:

  1. 每个请求正常处理之后的 hook 函数,通过 after_request 定义
  2. 不管请求是否异常都要执行的 teardown_request hook 函数 所以上面最重要的就是 dispatch_request 函数, 找到我们注册的路由函数, 并返回

preprocess_request 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  def preprocess_request(self):
# 获取当前请求对象的 blueprint
bp = _request_ctx_stack.top.request.blueprint
# None 是获取作用于全局的函数
funcs = self.url_value_preprocessors.get(None, ())
# 如果有蓝图,并且蓝图在 url_value_preprocessors 中
if bp is not None and bp in self.url_value_preprocessors:
funcs = chain(funcs, self.url_value_preprocessors[bp]) # 将蓝图上的函数与全局的函数进行合并
for func in funcs:
func(request.endpoint, request.view_args) # 执行
# 获取在 app 上注册的 before_request_funcs,可能会包含全局的和蓝图上的处理函数
funcs = self.before_request_funcs.get(None, ())
if bp is not None and bp in self.before_request_funcs:
funcs = chain(funcs, self.before_request_funcs[bp]) # 将蓝图上的函数与全局的函数进行合并
for func in funcs: # 执行 before_request_funcs
rv = func()
if rv is not None:
return rv

url_value_preprocessors 是一个字典,用于存储预处理请求 URL 中的值的函数。这些函数可以是全局的,也可以是与特定蓝图(Blueprint)关联的。当一个请求进入应用时,Flask 需要根据请求的蓝图确定需要执行哪些预处理函数。

考虑以下情况:您可能在应用级别注册了一些全局的 url_value_preprocessors,同时在特定的蓝图上也注册了一些预处理函数。在处理请求时,您可能希望先执行全局的预处理函数,然后再执行与当前蓝图关联的预处理函数。

这就是 funcs = chain(funcs, self.url_value_preprocessors[bp]) 这句代码的作用。它使用 itertools.chain 函数,将全局的预处理函数和与当前蓝图关联的预处理函数合并成一个生成器。生成器会依次返回这些函数,让 Flask 在适当的时候调用它们来预处理请求 URL 中的值。

要注意的是,itertools.chain 并不会立即执行这些函数,而是在迭代过程中依次执行它们。这样可以确保在请求处理之前按照合适的顺序执行所有的预处理函数。

finalize_request 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def finalize_request(self, rv, from_error_handler=False):
# 创建一个 response 对象
response = self.make_response(rv)
try:
# 处理返回前的钩子函数
response = self.process_response(response)
request_finished.send(self, response=response)
except Exception:
if not from_error_handler:
raise
self.logger.exception(
"Request finalizing failed with an error while handling an error"
)
return response

process_response 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
def process_response(self, response):
ctx = _request_ctx_stack.top
bp = ctx.request.blueprint
funcs = ctx._after_request_functions
if bp is not None and bp in self.after_request_funcs:
funcs = chain(funcs, reversed(self.after_request_funcs[bp]))
if None in self.after_request_funcs:
funcs = chain(funcs, reversed(self.after_request_funcs[None]))
for handler in funcs:
response = handler(response)
if not self.session_interface.is_null_session(ctx.session):
self.session_interface.save_session(self, ctx.session, response)
return response

响应

make_response 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def make_response(self, rv):
status = headers = None
# 如果返回是一个 tuple
if isinstance(rv, tuple):
len_rv = len(rv)

# 返回的长度为 3,那么分别就是 response 内容,http status,http headers
if len_rv == 3:
rv, status, headers = rv
# 如果返回长度只有两个
elif len_rv == 2:
# 且最后一个是 dict, tuple, list 格式的,那么返回是 response 内容和 headers
if isinstance(rv[1], (Headers, dict, tuple, list)):
rv, headers = rv
else:
# 否则返回内容就是 response 内容和 status
rv, status = rv
# 其他格式则不支持
else:
raise TypeError(
"The view function did not return a valid response tuple."
" The tuple must have the form (body, status, headers),"
" (body, status), or (body, headers)."
)

# 返回响应内容则不支持
if rv is None:
raise TypeError(
"The view function did not return a valid response. The"
" function either returned None or ended without a return"
" statement."
)

# 确保 response 是响应类的实例
if not isinstance(rv, self.response_class):
if isinstance(rv, (text_type, bytes, bytearray)):
rv = self.response_class(rv, status=status, headers=headers)
status = headers = None
elif isinstance(rv, dict):
rv = jsonify(rv)
elif isinstance(rv, BaseResponse) or callable(rv):
try:
rv = self.response_class.force_type(rv, request.environ)
except TypeError as e:
new_error = TypeError(
"{e}\nThe view function did not return a valid"
" response. The return type must be a string, dict, tuple,"
" Response instance, or WSGI callable, but it was a"
" {rv.__class__.__name__}.".format(e=e, rv=rv)
)
reraise(TypeError, new_error, sys.exc_info()[2])
else:
raise TypeError(
"The view function did not return a valid"
" response. The return type must be a string, dict, tuple,"
" Response instance, or WSGI callable, but it was a"
" {rv.__class__.__name__}.".format(rv=rv)
)

if status is not None:
if isinstance(status, (text_type, bytes, bytearray)):
rv.status = status
else:
rv.status_code = status
if headers:
rv.headers.extend(headers)
return rv

路由匹配

在 flask 中, 构建路由规则有两种方法, 这两种方法其实是一样的,都是调用 add_url_rule 来实现

  1. 通过`@app.route()`的装饰器, 上面例子用的就是这种方法
  2. 通过app.add_url_rule, 这个方法的签名为 add_url_rule(self, rule, endpoint=None, view_func=None, **options)
1
2
3
4
5
6
def route(self, rule, **options):
def decorator(f):
endpoint = options.pop("endpoint", None)
self.add_url_rule(rule, endpoint, f, **options)
return f
return decorator

add_url_rule 源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@setupmethod
def add_url_rule(
self,
rule,
endpoint=None,
view_func=None,
provide_automatic_options=None,
**options
):
# 如果没有设置 endpoint,那么 endpoint 就是函数名称
if endpoint is None:
endpoint = _endpoint_from_view_func(view_func)
options["endpoint"] = endpoint

# 获取函数对应的方法
methods = options.pop("methods", None)
if methods is None:
methods = getattr(view_func, "methods", None) or ("GET",)
if isinstance(methods, string_types):
raise TypeError(
"Allowed methods have to be iterables of strings, "
'for example: @app.route(..., methods=["POST"])'
)
methods = set(item.upper() for item in methods)

# 获取 required_methods
required_methods = set(getattr(view_func, "required_methods", ()))
if provide_automatic_options is None:
provide_automatic_options = getattr(
view_func, "provide_automatic_options", None
)
if provide_automatic_options is None:
if "OPTIONS" not in methods:
provide_automatic_options = True
required_methods.add("OPTIONS")
else:
provide_automatic_options = False

# 全部 methods
methods |= required_methods

# 获取一个 rule,底层使用的是 werkzeug.routing 的 Rule 类对象
rule = self.url_rule_class(rule, methods=methods, **options)
rule.provide_automatic_options = provide_automatic_options

# 将路由添加进入 view_functions 中,底层使用的是 werkzeug.routing 的 Map 类对象
self.url_map.add(rule)
if view_func is not None:
old_func = self.view_functions.get(endpoint)
if old_func is not None and old_func != view_func:
raise AssertionError(
"View function mapping is overwriting an "
"existing endpoint function: %s" % endpoint
)
self.view_functions[endpoint] = view_func

可以发现这个函数主要做的就是更新 app 的 url_map 和 view_functions 这两个变量.查找定义, 发现 url_map 是 werkzeug.routing 的Map 类对象, rule 是 werkzeug.routing 的 Rule 类对象, view_functions 就是一个字典, 从上我们也可以知道每个视图函数的 endpoint 必须是不同的.也可以发现, flask 的核心路由逻辑其实实在 werkzeug 中实现的。

dispatch_request 源码如下,在这里面将路由和要处理的函数结合起来,实现执行视图函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def dispatch_request(self):
# 获取当前请求
req = _request_ctx_stack.top.request
if req.routing_exception is not None:
self.raise_routing_exception(req)
# 获取当前请求的 rule
rule = req.url_rule
if (
getattr(rule, "provide_automatic_options", False)
and req.method == "OPTIONS"
):
return self.make_default_options_response()
# 通过 endpoint 匹配视图函数并且执行
return self.view_functions[rule.endpoint](**req.view_args)

上下文

之前在上面我们已经讲到dispatch_request函数在找到view_function后, 只是将最基本的参数传给了view_function, 可是有时这对视图函数来说是远远不够的, 它有时还需要头部(header), body里的数据, 才能正确运行, 可能 最简单的方法就是将所有的这些信息封装成一个对象, 作为参数传给视图函数, 可是这样一来所有的视图函数都需要添加对应的参数, 即使并没有用到它.

flask 的做法是把这些信息作为上下文, 类似全局变量的东西, 在需要的时候, 用 from flask import request 获取, 比如经常用的request.json, request.args, 这里有一个很重要的点就是它们必须是动态的, 在多线程或多协程的情况下, 每个线程或协程获取的必须是自己独特的对象, 不能导入后结果获取的是其他请求的内容, 那就乱套了.

那么flask是如何实现不同的线程协程准确获得自己的上下文的呢, 我们先来看一下这两个上下文的定义:

application context 演化成出两个变量 current_appg

request context 演化出 requestsession

他们的实现正式依靠 Local StackLocal Proxy 类, 正是这两个东西才让我们在并发程序中每个视图函数都会看到属于自己的上下文而不会混乱, 而这两个类能在多线程或多协程情况下实现隔离效果是考了另一个基础类 Local, 实现了类似threading.local的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def _lookup_req_object(name):
top = _request_ctx_stack.top
if top is None:
raise RuntimeError(_request_ctx_err_msg)
return getattr(top, name)

def _lookup_app_object(name):
top = _app_ctx_stack.top
if top is None:
raise RuntimeError(_app_ctx_err_msg)
return getattr(top, name)

def _find_app():
top = _app_ctx_stack.top
if top is None:
raise RuntimeError(_app_ctx_err_msg)
return top.app

_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, "request"))
session = LocalProxy(partial(_lookup_req_object, "session"))
g = LocalProxy(partial(_lookup_app_object, "g"))

为什么不直接使用 LocalStack,而是使用 LocalProxy 呢?

主要原因在于 LocalProxy 的设计目标是为了提供更加简洁和方便的访问方式,使开发人员能够以属性访问的方式轻松地获取当前线程的上下文数据。LocalProxy 可以用来简化代码,并让您的代码更具可读性和可维护性。通过使用 LocalProxy,您可以避免在多个地方重复引用 LocalStack 实例。

Local 代码如下:

__storage__ 是用于存储内容的地方,格式为 {“线程ID”: {“key”:”value”}},因为每个请求线程 ID 不一样,所以 Local 实现了隔离的效果。__ident_func__ 绑定的是 get_ident 函数,作用是获取当前的线程 ID。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
class Local(object):
# 设置只有这两个属性可以被外部访问
__slots__ = ("__storage__", "__ident_func__")

def __init__(self):
# 设置 __storage__ 的初始化值
object.__setattr__(self, "__storage__", {})
# 设置 __ident_func__ 的值,get_ident 获得当前线程的 id
object.__setattr__(self, "__ident_func__", get_ident)

# 遍历 Local
def __iter__(self):
return iter(self.__storage__.items())

def __call__(self, proxy):
"""Create a proxy for a name."""
return LocalProxy(self, proxy)

# 释放
def __release_local__(self):
self.__storage__.pop(self.__ident_func__(), None)
# 获取
# 下面三个方法实现了属性的访问,设置和删除
# 内部都调用了get_ident方法, 获取当前的线程或协程id, 然后一次为键访问值
# 这样外部只是看到访问实例的属性, 其实内部已经实现了线程或协程的切换
def __getattr__(self, name):
try:
return self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name)
# 设置
def __setattr__(self, name, value):
ident = self.__ident_func__()
storage = self.__storage__
try:
storage[ident][name] = value
except KeyError:
storage[ident] = {name: value}
# 删除
def __delattr__(self, name):
try:
del self.__storage__[self.__ident_func__()][name]
except KeyError:
raise AttributeError(name)

Local 是用来提供多线程或多协程的隔离属性访问的, 那么 Local Stack 就提供了隔离的栈访问, 它只要提供了 push, pop, top方法, 主要是栈的一些方法,在 LocalStack 的 push 方法中, 其实是对属性_local也就是 Local 的操作, 也就是先创建一个列表, self._local.storage[ident(当前线程或协程id)]['stack'] = [], 然后其实还是用 append 将 request 请求信息添加进去

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
class LocalStack(object):
def __init__(self):
self._local = Local()

def __release_local__(self):
self._local.__release_local__()

def _get__ident_func__(self):
return self._local.__ident_func__

def _set__ident_func__(self, value):
object.__setattr__(self._local, '__ident_func__', value)
__ident_func__ = property(_get__ident_func__, _set__ident_func__)
del _get__ident_func__, _set__ident_func__

def __call__(self):
def _lookup():
rv = self.top
if rv is None:
raise RuntimeError('object unbound')
return rv
return LocalProxy(_lookup)

# push, pop, top 实现了栈的操作
def push(self, obj):
rv = getattr(self._local, 'stack', None)
if rv is None:
self._local.stack = rv = []
rv.append(obj)
return rv

def pop(self):
stack = getattr(self._local, 'stack', None)
if stack is None:
return None
elif len(stack) == 1:
release_local(self._local)
return stack[-1]
else:
return stack.pop()

@property
def top(self):
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None

上述已经将 Local 和 LocalStack 讲的差不多了。request ,g 的实现,用的是 LocalProxy,LocalProxy 还重写了所有的魔术方法,具体实现都是代理对象,LocalProxy 简要代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@implements_bool
class LocalProxy(object):
__slots__ = ('__local', '__dict__', '__name__', '__wrapped__')

def __init__(self, local, name=None):
object.__setattr__(self, '_LocalProxy__local', local)
object.__setattr__(self, '__name__', name)
if callable(local) and not hasattr(local, '__release_local__'):
# "local" 是一个回调函数
object.__setattr__(self, '__wrapped__', local)
# 获取当前线程或协程对应的对象
def _get_current_object(self):
if not hasattr(self.__local, '__release_local__'):
return self.__local()
try:
return getattr(self.__local, self.__name__)
except AttributeError:
raise RuntimeError('no object bound to %s' % self.__name__)

_request_ctx_stack 代表的就是请求上下文,ctx 其实是 RequestContext,请求来的时候会调用 RequestContext 的 push 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def push(self):
# app_ctx 存放的是 Flask 应用实例,如果不存在,且不是当前的应用实例,那么就创建一个,在进行 push 操作
app_ctx = _app_ctx_stack.top
if app_ctx is None or app_ctx.app != self.app:
app_ctx = self.app.app_context()
app_ctx.push()
self._implicit_app_ctx_stack.append(app_ctx)
else:
self._implicit_app_ctx_stack.append(None)

if hasattr(sys, "exc_clear"):
sys.exc_clear()
# push 当前 RequestContext 对象到 _request_ctx_stack 中
_request_ctx_stack.push(self)
if self.session is None:
session_interface = self.app.session_interface
self.session = session_interface.open_session(self.app, self.request)

if self.session is None:
self.session = session_interface.make_null_session(self.app)

if self.url_adapter is not None:
self.match_request()

def pop(self, exc=_sentinel):
app_ctx = self._implicit_app_ctx_stack.pop()

try:
clear_request = False
if not self._implicit_app_ctx_stack:
self.app.do_teardown_request(exc)

request_close = getattr(self.request, 'close', None)
if request_close is not None:
request_close()
clear_request = True
finally:
rv = _request_ctx_stack.pop()
if clear_request:
rv.request.environ['werkzeug.request'] = None
if app_ctx is not None:
app_ctx.pop(exc)

def auto_pop(self, exc):
if self.request.environ.get('flask._preserve_context') or \
(exc is not None and self.app.preserve_context_on_exception):
self.preserved = True
self._preserved_exc = exc
else:
self.pop(exc)

push 就是将该请求的 application context (如果 _app_ctx_stack 栈顶不是当前请求所在 app,需要重新创建 app context )和 request context 都保存到相关的栈上, pop 则相反, 做一些出栈清理操作。

现在上下文就比较清楚了,就是每次有请求过来,flask 会创建当前线程或协程需要处理的两个上下文,并压入隔离的栈。application context针对的是 flask实 例的, 因为 app 实例只有一个, 所以多个请求其实是公用一个 application context, 而 request context 是每次请求过来都要创建的,在请求结束时又出栈, 所以两个的生命周期时不同的, 也就是 application context 的周期就是实例的生命周期, 而request context 的生命周期取决于请求存在的时间。

flask: app.run() 时, 并没有看到哪里启动了多线程, 理论上在单线程的情况下, 只有一个请求处理完成之后才能处理下一个请求, 那么上面为什么能同时处理多个请求, 哪里创建了多线程呢?
在flask的启动函数(run)中, 有这么一个源码:

1
options.setdefault("threaded", True)

有了 LocalStack 为什么还需要 LocalProxy?LocalProxy 起到了什么作用?

LocalStack 是 Flask 中用于在每个线程中管理上下文数据的工具。它在多线程环境下为每个线程维护一个独立的数据栈,确保数据隔离和线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 创建 LocalStack 实例
my_stack = LocalStack()

# 在线程中推入和弹出数据
def worker(data):
my_stack.push(data)
print("Data pushed:", my_stack.top)
my_stack.pop()

# 启动多个线程来演示 LocalStack 的工作
import threading

threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"Thread-{i}",))
threads.append(t)
t.start()

for t in threads:
t.join()

LocalProxy 是 Flask 中的一个代理对象,用于访问 LocalStack 中的数据。它提供了一种简洁的方式来访问当前线程的上下文数据,而不需要显式地引用 LocalStack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 创建 LocalStack 实例
my_stack = LocalStack()

# 将 LocalStack 包装为 LocalProxy
current_data = LocalProxy(my_stack)

# 在线程中推入数据并通过 LocalProxy 访问
def worker(data):
my_stack.push(data)
print("Data pushed:", current_data)
my_stack.pop()

# 启动多个线程来演示 LocalProxy 的工作
import threading

threads = []
for i in range(3):
t = threading.Thread(target=worker, args=(f"Thread-{i}",))
threads.append(t)
t.start()

for t in threads:
t.join()

在上述示例中,LocalStack 用于在每个线程中管理数据栈,而 LocalProxy 则是在访问数据时提供了简洁的方式。通过将 LocalStack 包装为 LocalProxy,您可以通过属性访问的方式来获取当前线程的上下文数据,而无需明确引用 LocalStack

总结:LocalStack 用于管理数据栈的隔离和线程安全,而 LocalProxy 则是提供了方便的访问方式,使您能够轻松地从当前线程的上下文中获取数据。

异常处理

在 Flask 中,异常处理主要涉及以下几个核心模块和类:

  1. werkzeug.exceptions 模块:Werkzeug 是 Flask 的底层库,werkzeug.exceptions 模块定义了许多常见的 HTTP 异常类,如 HTTPExceptionBadRequestNotFound 等。这些异常类用于表示 HTTP 请求和响应中的不同状态码和错误情况。
  2. flask 模块中的异常类:在 Flask 的 flask 模块中,有一些自定义的异常类,如 FlaskExceptionRequestRedirect 等。这些异常类用于处理与 Flask 特定功能和行为相关的错误。
  3. 应用和蓝图中的错误处理装饰器:在 Flask 应用中,您可以使用装饰器 `@app.errorhandler(Exception)` 来为特定异常注册处理函数,从而实现自定义的异常处理逻辑。同样,在蓝图中也有类似的机制。

**@app.errorhandler 源码分析**

`@app.errorhandler` 装饰器是 Flask 提供的一种机制,用于为特定的异常类型注册自定义的错误处理函数。当异常被抛出并且与装饰器中指定的异常类型匹配时,注册的错误处理函数会被调用。让我们来分析一下这个装饰器的源码以及它是如何工作的。

在 Flask 中,`@app.errorhandler装饰器的定义位于flask.app` 模块中。以下是大致的源码结构和解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Flask:
# ...

def errorhandler(self, code_or_exception):
def decorator(f):
self._register_error_handler(code_or_exception, f)
return f
return decorator

def _register_error_handler(self, code_or_exception, f):
# 将错误处理函数注册到 _error_handlers 字典中
self._error_handlers[code_or_exception] = f

def handle_exception(self, e):
# 查找匹配异常类型的错误处理函数并调用
for code_or_exception, handler in self._error_handlers.items():
if isinstance(e, code_or_exception):
return handler(e)
return None
  1. `@app.errorhandlerFlask类的一个方法,它接受一个异常类型或者状态码作为参数。它返回一个装饰器函数decorator`。
  2. decorator 函数接受一个处理函数 f,并在内部调用 _register_error_handler 方法将处理函数与异常类型或状态码注册到 _error_handlers 字典中。
  3. _register_error_handler 方法将异常类型或状态码作为键,处理函数作为值,注册到 _error_handlers 字典中。
  4. 当应用中抛出异常时,handle_exception 方法会被调用。它会遍历 _error_handlers 字典,查找与异常类型匹配的处理函数,并调用该处理函数来处理异常。

是怎么触发错误处理的?

在 wsgi_app 中,如果 full_dispatch_request 调用失败,则会调用 handle_exception 进行错误处理,在 handle_exception 中,会根据当前的 error 去寻找对应的注册的回调方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def handle_exception(self, e):
exc_type, exc_value, tb = sys.exc_info()
got_request_exception.send(self, exception=e)

if self.propagate_exceptions:
if exc_value is e:
reraise(exc_type, exc_value, tb)
else:
raise e

self.log_exception((exc_type, exc_value, tb))
server_error = InternalServerError()
server_error.original_exception = e
handler = self._find_error_handler(server_error) # 寻找处理错误的 handler,没有则就是 server_error

if handler is not None:
server_error = handler(server_error)

return self.finalize_request(server_error, from_error_handler=True)

信号

Flask 信号机制的核心是 blinker 库,它提供了一种用于实现事件触发和响应的简单方法。blinker 是一个独立的 Python 库,被 Flask 内部用作信号机制的基础。

以下是 Flask 中信号的基本用法:

  1. 导入 blinker 模块:在 Flask 中,您可以通过导入 flask.signals 模块来获得 blinker 的实例。

  2. 创建信号对象:使用 flask.signals.Namespace 类创建信号对象。例如:

    1
    2
    3
    4
    from flask.signals import Namespace

    signals = Namespace()
    my_signal = signals.signal("my-signal")
  3. 连接信号和回调函数:使用 .connect 方法将信号与回调函数关联起来。

    1
    2
    3
    @my_signal.connect
    def my_signal_handler(sender, **kwargs):
    print("Signal received!")
  4. 触发信号:在适当的时候,通过调用信号对象的 .send 方法来触发信号。您可以在应用中的某个地方触发信号,然后相关的回调函数会被执行。

    1
    my_signal.send(None)

在 Flask 中,有一些内置的信号可以用于不同的事件,例如 request_startedrequest_finishedtemplate_rendered 等。您还可以自定义信号来实现特定的事件响应。

信号机制在 Flask 中的应用范围广泛,它可以用于以下场景:

  • 在请求处理开始和结束时执行一些操作。
  • 在模板渲染完成后执行特定的操作。
  • 在应用启动、关闭或重载时触发自定义操作。
  • 在扩展或插件中进行事件监听和处理。